package com.dtwave.cfstreaming.filter.function;

import com.alibaba.fastjson.JSONObject;
import com.dtwave.constant.FieldConstants;
import com.dtwave.utils.JsonUtils;
import org.apache.flink.api.common.functions.RichFilterFunction;

public class FilterFunction extends RichFilterFunction<JSONObject> {
    @Override
    public boolean filter(JSONObject jsonObject) throws Exception {
        String opType = JsonUtils.getString(jsonObject, true, FieldConstants.OP_TYPE);
        if (!FieldConstants.I.equals(opType) && !FieldConstants.U.equals(opType)) {
            return false;
        }
        return true;
    }
}
